package com.yiqixuejava.reconciliation.service.impl;

import com.yiqixuejava.reconciliation.annotation.RFiledName;
import com.yiqixuejava.reconciliation.config.RedisUtils;
import com.yiqixuejava.reconciliation.dto.RDetail;
import com.yiqixuejava.reconciliation.entity.ResourceEntity;
import com.yiqixuejava.reconciliation.entity.SourceEntity;
import com.yiqixuejava.reconciliation.exception.ServiceException;
import com.yiqixuejava.reconciliation.service.RApi;
import com.yiqixuejava.reconciliation.service.ReconService;
import com.yiqixuejava.reconciliation.service.ResourceService;
import com.yiqixuejava.reconciliation.service.SourceService;
import com.yiqixuejava.reconciliation.util.BeanConverter;
import com.yiqixuejava.reconciliation.util.CodeUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.lang.reflect.Field;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.yiqixuejava.reconciliation.exception.CoreExceptionEnum.*;

/**
 * 类的描述：对账基本实现类
 *
 * 默认资源CODE 和 资源解析类名称一致
 * 如： ACOM 的 解析类为ACOM
 *
 * @author: like
 * @createDate: 2019年11月08日  10:53
 * @version: v1.0
 * @jdk version used: JDK1.8
 * @see <相关类>
 */
@Slf4j
@Service
public class RBaseService implements RApi {

    /**
     * 解析 BEAN 的数量
     */
    public static final int BEAN_NUM = 2;
    /**
     * 每一批插入最大数量
     */
    public static final int MAX_SIZE = 5000;

    public static final int MAX_TIME_OUT = 30*60;
    /**一次查询数据最大数量**/
    public static final int MAX_QUERY_LIMIT = 1000;

    /**
     * 平台方
     */
    public static final String PLAT = "PLAT";
    /**
     * 通道方
     */
    private static final String CHANNEL = "CHANNEL";

    /**redis hash 表中 key**/
    private static final String BATCHSQL = "batchSql";

    private static final String BATCHQUERYSQL = "batchQuerySql";
    /**需要被清除的key,放在hash表needClearRedisKeys 中 **/
    private static final String NEEDCLEARREDISKEYS = "needClearRedisKeys";

    @Autowired
    private ResourceService resourceService;
    @Autowired
    private SourceService sourceService;
    @Autowired
    private RedisUtils redisUtils;



    /**
     * 对账核心实现
     *
     * @param pBeanName 平台方资源ID
     * @param qBeanName 渠道方资源ID
     * @return 批次号
     */
    @Override
    public void reconciliation(String pBeanName, String qBeanName) {
        Map<String,ReconService> reconServiceMap = null;
        try{
            // 检查解析bean是否存在
            reconServiceMap = checkBean(pBeanName,qBeanName);
            //初始化,准备解析所需数据
            String batchNo = init(reconServiceMap,pBeanName,qBeanName);
            //解析文件,并且批量存储
            analysisFile(pBeanName,getBean(reconServiceMap,pBeanName,PLAT),qBeanName,getBean(reconServiceMap,qBeanName,CHANNEL),batchNo);
            //对账核心
            Map<String,String> diffMap = doReconciliation(batchNo);
           //批量更新数据
           batchUpdateResource(batchNo,diffMap);

        }catch (ServiceException e){
            e.printStackTrace();
            throw e;
        }catch (Exception e){
            log.error(" 发生未知异常 : {}" , e);
            throw new ServiceException(Unknown_EXCEPTION);
        }finally {
            clearRedis();
            reconServiceMap.forEach((key, service) ->{
                service.destroy();
            });
        }



    }

    /**
     * 清除需要清除的key ,防止redis 内存溢出
     */
    private void clearRedis() {
        Map<Object, Object> hmget = redisUtils.hmget(NEEDCLEARREDISKEYS);
        hmget.keySet().forEach(e -> {
            redisUtils.del(e.toString());
            redisUtils.hdel(NEEDCLEARREDISKEYS,e.toString());
        });


    }

    /**
     * 批量更新数据
     * @param batchNo
     * @param diffMap
     */
    private void batchUpdateResource(String batchNo, Map<String,String> diffMap) {
        //判断是那个集合中的数据，对应给予平台多还是渠道多
        if(diffMap.containsKey(PLAT)){
            String platDiffKey = diffMap.get(PLAT);
            updateResources(platDiffKey,"1");

        }else if(diffMap.containsKey(CHANNEL)){
            String channelDiffKey = diffMap.get(CHANNEL);
            updateResources(channelDiffKey,"2");

        }



    }

    private void updateResources(String key,String result) {
        Set<Object> diff = redisUtils.sGet(key);
        Set<String> diffStr = new HashSet<>(diff.size());
        diff.forEach(e ->{
            diffStr.add( e.toString().split("_")[0]);
        });

        boolean update = resourceService.lambdaUpdate().set(ResourceEntity::getResult, result).in(ResourceEntity::getOrderNo, diffStr).update();
        log.info("批量更新结果 {}" ,update);
    }

    /**
     * 对账核心
     */
    private Map<String,String> doReconciliation(String batchNo) {
        Map<String,String> keyMap = loadToRedis(batchNo);

        return reconciliationCore(keyMap,batchNo);

    }

    /**
     * 获取redis 中一个集合的所有数据
     * @param setName
     * @return
     */
    private Set<String> getSet(String setName) {
        Set<String> diff = new HashSet<>();
        Set<Object> objects = redisUtils.sGet(setName);
        objects.forEach(e -> {
            diff.add((String)e);
        });
        return diff;
    }

    /**
     * 对账核心
     * @param keyMap 对账双方的 redis key 集合
     * @param batchNo 批次号
     * @return 对账差异的 redis 中 key
     */
    private Map<String,String> reconciliationCore(Map<String, String> keyMap,String batchNo) {
        String platKey = keyMap.get(PLAT);
        String channelKey = keyMap.get(CHANNEL);
        String pDestination = "P"+batchNo;
        String qDestination = "Q"+batchNo;
        long pSdiffstore = redisUtils.sdiffstore(pDestination, platKey, channelKey);
        log.info("平台多的数量 = " + pSdiffstore);
        long qSdiffstore = redisUtils.sdiffstore(qDestination, channelKey, platKey);
        log.info("通道多的数量 = " + qSdiffstore);
        Map<String,String> diffMap = new HashMap(2);
        if(pSdiffstore > 0){
            diffMap.put(PLAT,pDestination);
            redisUtils.hset(NEEDCLEARREDISKEYS,pDestination,pDestination,MAX_TIME_OUT);
        }

        if(qSdiffstore > 0){
            diffMap.put(CHANNEL,qDestination);
            redisUtils.hset(NEEDCLEARREDISKEYS,qDestination,qDestination,MAX_TIME_OUT);
        }



        return diffMap;
    }

    /**
     * 加载一个批次号的数据到redis 进行对账
     * @param batchNo
     */
    private  Map<String,String> loadToRedis(String batchNo) {
        String batchPSqlKey = getHashBatchSqlKey(batchNo,PLAT);
        String batchQSqlKey = getHashBatchSqlKey(batchNo,CHANNEL);
        String batchCountPSqlKey = getHashBatchCountSqlKey(batchNo,PLAT);
        String batchCountQSqlKey = getHashBatchCountSqlKey(batchNo,CHANNEL);

        Map<String,String> keyMap = new HashMap<>();

        keyMap.putAll(loadToRedis(batchNo,batchPSqlKey,batchCountPSqlKey,PLAT));
        keyMap.putAll(loadToRedis(batchNo,batchQSqlKey,batchCountQSqlKey,CHANNEL));
        return keyMap;
    }

    /**
     * 加载数据到Redis
     * @param
     */
    private Map<String,String> loadToRedis(String batchNo,String batchSqlKey,String batchCountSqlKey,String flag) {
        Map<String,String> keyMap = new HashMap<>();
        String countSql = (String)redisUtils.hget(BATCHQUERYSQL, batchCountSqlKey);
        long pSqlCount = resourceService.execBatchCountPSql(countSql);

        String batchSql = (String)redisUtils.hget(BATCHSQL, batchSqlKey);
        String setKey = flag + batchNo;
        keyMap.put(flag,setKey);
        if(pSqlCount ==0){
            return Collections.EMPTY_MAP;
        }else if(pSqlCount <= MAX_QUERY_LIMIT){
            if (executeBatchQuery(batchSql, setKey)){
                return keyMap;
            }
        }else {
            long page = pSqlCount/MAX_QUERY_LIMIT;
            if(pSqlCount%MAX_QUERY_LIMIT > 0){
                page ++;
            }
            for(int i=0;i<page;i++){
                String querySql = batchSql + " limit " + i*MAX_QUERY_LIMIT +"," + MAX_QUERY_LIMIT;
                if (executeBatchQuery(querySql, setKey)) {
                    return keyMap;
                }
            }


        }

        return keyMap;



    }

    private boolean executeBatchQuery(String batchSql, String setKey) {
        Set<String> resourceSet =  resourceService.execBatchQuerySql(batchSql);
        if(resourceSet == null || resourceSet.isEmpty()){
            return true;
        }
        if(redisUtils.sSetAndTime(setKey,MAX_TIME_OUT,resourceSet.toArray()) == 0){
            throw new ServiceException(REDIS_INSERT_EXCEPTION);
        }
        redisUtils.hset(NEEDCLEARREDISKEYS,setKey,setKey,MAX_TIME_OUT);


        return false;
    }

    /**
     *  获取数据，并且存储数据
     * @param pBeanName 平台方beanName
     * @param platBean 平台方的bean
     * @param channelBeanName 渠道方beanName
     * @param channelBean 渠道 bean
     * @param batchNo 批次号
     */
    private void analysisFile(String pBeanName,ReconService platBean,String channelBeanName,ReconService channelBean,String batchNo) throws Exception {
        boolean isNeedContinue  = getNextData(pBeanName, platBean, batchNo,PLAT);
        while(isNeedContinue){
            isNeedContinue = getNextData(pBeanName, platBean, batchNo,PLAT);
        }
        isNeedContinue = getNextData(channelBeanName, channelBean, batchNo,CHANNEL);
        while(isNeedContinue){
            isNeedContinue = getNextData(channelBeanName, channelBean, batchNo,CHANNEL);
        }
    }

    /**
     * 获取数据
     * @param beanName bean的名称
     * @param bean bean 本身
     * @param batchNo 批次号
     * @param flag 平台标志
     * @return
     */
    private boolean getNextData(String beanName, ReconService bean, String batchNo,String flag) throws Exception {
        List<RDetail> nextData = bean.getNextData(MAX_SIZE);
        if(nextData == null || nextData.size() == 0){
            log.info(beanName  + " 解析已经完成 ");
            return false;
        }
        if(nextData.size() > MAX_SIZE){
            throw new ServiceException(LIST_SIZE_EXCEED_MAX);
        }
        List<ResourceEntity> resourceEntities = BeanConverter.convert(ResourceEntity.class, nextData);
        resourceEntities.forEach(e ->{
            e.setBatchNo(batchNo);
            e.setSourceCode(beanName);
            e.setPlatform(flag);
            e.setResult(0);
        });
        boolean saveBatchResult = resourceService.saveBatch(resourceEntities, resourceEntities.size());
        if(saveBatchResult){
            log.info( " 批次号id： "+batchNo);
            log.info(beanName + " 批量插入成功 ");
        }else {
            log.error(beanName  + " 批量插入失败 ");
        }

        return true;
    }

    /**
     * 为解析做准备
     * @param
     */
    private String init(Map<String,ReconService> reconServiceMap,String pBeanName ,String qBeanName ) {

        // 调用bean 的初始化接口
        StringBuffer existJobKey = new StringBuffer();
        SourceEntity pSourceEntity = sourceService.query().eq("source_code", pBeanName).one();
        SourceEntity qSourceEntity = sourceService.query().eq("source_code", qBeanName).one();
        if(pSourceEntity ==null || qSourceEntity == null){
            throw new ServiceException(SOURCES_NOT_FOUND);
        }
        String existJobKeyStr = existJobKey.append(pSourceEntity.getOrgId()).append(pBeanName).append(qSourceEntity.getOrgId()).append(qBeanName).toString();
        if(redisUtils.hasKey(existJobKeyStr)){
            throw new ServiceException(JOB_IS_RUNNING);
        }
        redisUtils.set(existJobKeyStr,existJobKeyStr,MAX_TIME_OUT);
        redisUtils.hset(NEEDCLEARREDISKEYS,existJobKeyStr,existJobKeyStr,MAX_TIME_OUT);
        reconServiceMap.forEach((key, service) ->{
            if(key.startsWith(PLAT)){
                service.prepare(pSourceEntity);
            }else {
                service.prepare(qSourceEntity);
            }

        });

        return  prepard(pBeanName,qBeanName);

    }

    /**
     * 对账平台需要准备的数据
     * @param pBeanName 平台方对账beanName
     * @param qBeanName 渠道方对账beanName
     * @return
     */
    private String prepard(String pBeanName ,String qBeanName) {
        //生成一个批次号
        String batchNo = createBatchNo();
        // 获取注解顺序,
        getAnnotation(batchNo,pBeanName,qBeanName);


        return batchNo;


    }

    /**
     * 获取对账顺序
     * @param batchNo 批次号
     * @param pBeanName 平台方beanName
     * @param qBeanName 渠道方beanName
     */
    private void getAnnotation(String batchNo,String pBeanName ,String qBeanName) {
        String pSelectSql = getSelectResourceSql( batchNo, pBeanName,PLAT);
        String qSelectSql = getSelectResourceSql(batchNo,qBeanName,CHANNEL);

        String pCountSql = getSelectResourceCountSql(batchNo, pBeanName,PLAT);
        String qCountSql = getSelectResourceCountSql(batchNo,qBeanName,CHANNEL);

        String batchPSelectSqlKey = getHashBatchSqlKey(batchNo,PLAT);
        String batchQSelectSqlKey = getHashBatchSqlKey(batchNo,CHANNEL);

        String batchCountPSqlKey = getHashBatchCountSqlKey(batchNo,PLAT);
        String batchCountQSqlKey =  getHashBatchCountSqlKey(batchNo,CHANNEL);



        redisUtils.hset(BATCHSQL,batchPSelectSqlKey,pSelectSql,MAX_TIME_OUT);


        redisUtils.hset(BATCHQUERYSQL,batchCountPSqlKey,pCountSql,MAX_TIME_OUT);

        redisUtils.hset(BATCHSQL,batchQSelectSqlKey,qSelectSql,MAX_TIME_OUT);

        redisUtils.hset(BATCHQUERYSQL,batchCountQSqlKey,qCountSql,MAX_TIME_OUT);



    }

    /**
     * 获取在hash表中的key
     * @param batchNo 批次号
     * @param flag 平台标识
     * @return
     */
    private String getHashBatchSqlKey(String batchNo,String flag){
        if(flag.equals(PLAT)){
            return batchNo + "PSql";
        }else {
            return batchNo + "QSql";
        }
    }

    /**
     * 获取在hash表中的key
     * @param batchNo 批次号
     * @param flag 平台标识
     * @return
     */
    private String getHashBatchCountSqlKey(String batchNo,String flag){
        if(flag.equals(PLAT)){
            return batchNo + "CountPSql";
        }else {
            return batchNo + "CountQSql";
        }
    }

    /**
     * 获取查询资源总条数语句
     * @param batchNo 批次号
     * @param sourceCode 资源code 这里指的是beanName
     * @param flag 平台标识
     * @return
     */
    private String getSelectResourceCountSql(String batchNo,String sourceCode,String flag ){
        return  "select count(*) from r_resources r where r.batch_no ='" + batchNo +"' and r.platform = '"+flag+"' and r.source_code = '" + sourceCode +"'";
    }

    /**
     * 获取查询语句sql
     * @param batchNo 批次号
     * @param sourceCode 资源code 这里指的是beanName
     * @param flag 平台标识
     * @return 组装好的SQL
     */
    private String getSelectResourceSql(String batchNo,String sourceCode,String flag ) {
        Map<String,Field> map = new HashMap<>();
        StringBuffer buffer = new StringBuffer("select CONCAT(");
        Class<RDetail> rDetailClass = RDetail.class;
        Field[] fields = rDetailClass.getDeclaredFields();
        for(int i=0; i< fields.length ;i++){
            Field field = fields[i];
            RFiledName fieldAnnotation = field.getDeclaredAnnotation(RFiledName.class);
            if(fieldAnnotation!=null){
                int order = fieldAnnotation.order();
                map.put(String.valueOf(order),field);
            }

        }
        List<Map.Entry<String,Field>> list = new ArrayList<Map.Entry<String,Field>>(map.entrySet());
        Collections.sort(list,new Comparator<Map.Entry<String,Field>>() {
            //升序排序
            @Override
            public int compare(Map.Entry<String, Field> o1,
                               Map.Entry<String, Field> o2) {
                return o1.getKey().compareTo(o2.getKey());
            }

        });
        for(Map.Entry<String,Field> mapping:list){
            buffer.append("r.").append(underline(mapping.getValue().getName())).append(",").append("'_'").append(",");
        }
        String sql = buffer.toString().substring(0, buffer.toString().length() - 1);
        sql = sql + ") from r_resources r where r.batch_no = '" + batchNo + "' and r.platform = '";
        String pSql = sql + flag +"' and r.source_code = '"+ sourceCode +"'";
        return pSql;
    }

    /**
     * 生成一个批次号
     * @return
     */
    private String createBatchNo() {
        return CodeUtils.generateCode();
    }

    /**
     * 检查当前任务是否已经在执行
     */
    private void checkJobExist(SourceEntity sourceEntity) {


    }

    /**
     * 检查传入bean是否存在
     * @param pBeanName 平台方解析服务bean名称
     * @param qBeanName 渠道方解析服务bean名称
     * @return 初始化之后的bean ，解决两个相同的文件对账
     */
    private Map<String,ReconService> checkBean(String pBeanName, String qBeanName) {

        if(SpringContextHolder.containsBean(pBeanName) && SpringContextHolder.containsBean(qBeanName)){
            return initBean(pBeanName,qBeanName);
        }else {
            throw new ServiceException(BEAN_NOT_EXIST);
        }

    }


    /**
     *
     * @param pBeanName
     * @param qBeanName
     * @return
     */
    private Map<String,ReconService>  initBean(String pBeanName,String qBeanName){
        Map<String,ReconService> reconServiceMap = null;
        reconServiceMap  = new HashMap<>(BEAN_NUM);
        reconServiceMap.put(PLAT +pBeanName,SpringContextHolder.getBean(pBeanName));
        reconServiceMap.put(CHANNEL+ qBeanName,SpringContextHolder.getBean(qBeanName));
        return reconServiceMap;
    }

    private ReconService getBean(Map<String,ReconService> reconServiceMap,String beanName,String flag){
        return  reconServiceMap.get(flag+beanName);
    }

    /**
     * 驼峰转下划线
     * @param str
     * @return
     */
    public static StringBuffer underline(String str) {
        Pattern pattern = Pattern.compile("[A-Z]");
        Matcher matcher = pattern.matcher(str);
        StringBuffer sb = new StringBuffer(str);
        if(matcher.find()) {
            sb = new StringBuffer();
            //将当前匹配子串替换为指定字符串，并且将替换后的子串以及其之前到上次匹配子串之后的字符串段添加到一个StringBuffer对象里。
            //正则之前的字符和被替换的字符
            matcher.appendReplacement(sb,"_"+matcher.group(0).toLowerCase());
            //把之后的也添加到StringBuffer对象里
            matcher.appendTail(sb);
        }else {
            return sb;
        }
        return underline(sb.toString());
    }



}